{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<img width=\"200\" src=\"https://mmlspark.blob.core.windows.net/graphics/emails/vw-blue-dark-orange.svg\" />\n",
    "\n",
    "# Contextual-Bandits using Vowpal Wabbit\n",
    "\n",
    "In the contextual bandit problem, a learner repeatedly observes a context, chooses an action, and observes a loss/cost/reward for the chosen action only. Contextual bandit algorithms use additional side information (or context) to aid real world decision-making. They work well for choosing actions in dynamic environments where options change rapidly, and the set of available actions is limited.\n",
    "\n",
    "An in-depth tutorial can be found [here](https://vowpalwabbit.org/docs/vowpal_wabbit/python/latest/tutorials/python_Contextual_bandits_and_Vowpal_Wabbit.html)\n",
    "\n",
    "[Azure Personalizer](https://azure.microsoft.com/en-us/products/cognitive-services/personalizer) emits logs in DSJSON-format. This example demonstrates how to perform off-policy evaluation.\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Step1: Read the dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pyspark.sql.types as T\n",
    "from pyspark.sql import functions as F\n",
    "\n",
    "schema = T.StructType(\n",
    "    [\n",
    "        T.StructField(\"input\", T.StringType(), False),\n",
    "    ]\n",
    ")\n",
    "\n",
    "df = (\n",
    "    spark.read.format(\"text\")\n",
    "    .schema(schema)\n",
    "    .load(\"wasbs://publicwasb@mmlspark.blob.core.windows.net/decisionservice.json\")\n",
    ")\n",
    "# print dataset basic info\n",
    "print(\"records read: \" + str(df.count()))\n",
    "print(\"Schema: \")\n",
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "display(df)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Step 2: Use VowpalWabbitFeaturizer to convert data features into vector"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from synapse.ml.vw import VowpalWabbitDSJsonTransformer\n",
    "\n",
    "df_train = (\n",
    "    VowpalWabbitDSJsonTransformer()\n",
    "    .setDsJsonColumn(\"input\")\n",
    "    .transform(df)\n",
    "    .withColumn(\"splitId\", F.lit(0))\n",
    "    .repartition(2)\n",
    ")\n",
    "\n",
    "# Show structured nature of rewards\n",
    "df_train.printSchema()\n",
    "\n",
    "# exclude JSON to avoid overflow\n",
    "display(df_train.drop(\"input\"))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Step 3: Train model\n",
    "\n",
    "VowpalWabbitGeneric performs these steps:\n",
    "\n",
    "* trains a model for each split (=group)\n",
    "* synchronizes accross partitions after every split\n",
    "* store the 1-step ahead predictions in the model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from synapse.ml.vw import VowpalWabbitGeneric\n",
    "\n",
    "model = (\n",
    "    VowpalWabbitGeneric()\n",
    "    .setPassThroughArgs(\n",
    "        \"--cb_adf --cb_type mtr --clip_p 0.1 -q GT -q MS -q GR -q OT -q MT -q OS --dsjson --preserve_performance_counters\"\n",
    "    )\n",
    "    .setInputCol(\"input\")\n",
    "    .setSplitCol(\"splitId\")\n",
    "    .setPredictionIdCol(\"EventId\")\n",
    "    .fit(df_train)\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Step 4: Predict and evaluate"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_predictions = model.getOneStepAheadPredictions()  # .show(5, False)\n",
    "df_headers = df_train.drop(\"input\")\n",
    "\n",
    "df_headers_predictions = df_headers.join(df_predictions, \"EventId\")\n",
    "display(df_headers_predictions)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from synapse.ml.vw import VowpalWabbitCSETransformer\n",
    "\n",
    "metrics = VowpalWabbitCSETransformer().transform(df_headers_predictions)\n",
    "\n",
    "display(metrics)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For each field of the reward column the metrics are calculated"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "per_reward_metrics = metrics.select(\"reward.*\")\n",
    "\n",
    "display(per_reward_metrics)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
